package com.broker.utils.events.support.kafka;

import com.broker.base.utils.ObjectUtils;
import com.broker.utils.events.support.IMQProducer;
import com.broker.utils.events.support.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Slf4j
public class BrokerKafkaProducer implements IMQProducer {
    private final MQConfig kafkaConfig;
    private Producer<String, String> producer;
    public BrokerKafkaProducer(MQConfig kafkaConfig){
        this.kafkaConfig = kafkaConfig;
        init();
    }

    private void init(){
        Properties props = new Properties();
        props.put("bootstrap.servers", this.kafkaConfig.getMqServerUrl());
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        this.producer = new KafkaProducer<>(props);
    }
    // 发送消息
    public void sendMessage(String topic, Object msg) throws InterruptedException,TimeoutException, ExecutionException {
        String key = msg.getClass().getName();
        String value = ObjectUtils.json(msg);
        Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, value));
        RecordMetadata recordMetadata = future.get(1000, TimeUnit.MILLISECONDS);
        log.debug("Kafka: send message offset:" + recordMetadata.offset()  +"    TOPIC:" + topic +"   OBJECT:" + msg.getClass().getName() +"    status: OK");
    }

    public void start(){
    }

    public void stop(){
        this.producer.close();
    }
}
